{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "## (Bonus) Streaming data prediction using Cloud ML Engine \n",
    "\n",
    "This notebook illustrates:\n",
    "\n",
    "1. Create a PubSub Topic and Subscription.\n",
    "2. Create a Dataflow Streaming pipeline to consume messages.\n",
    "3. Use the deployed Cloud ML Engine API to make prediction.\n",
    "4. Stroe the data and the prediction in BigQuery.\n",
    "5. Run a stream data simulator."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "TIME_FORMAT = '%Y-%m-%d %H:%M:%S'\n",
    "\n",
    "DATASET = 'playground_ds'\n",
    "TABLE = 'babyweight_estimates'\n",
    "\n",
    "PROJECT = 'cloud-training-demos'\n",
    "STG_BUCKET = 'cloud-training-demos-ml'\n",
    "REGION = 'us-central1'\n",
    "\n",
    "TOPIC = 'babyweights'\n",
    "SUBSCRIPTION='babyweights-sub'\n",
    "\n",
    "MODEL_NAME='babyweight_estimator'\n",
    "VERSION='v1'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "\n",
    "pip install google-cloud-dataflow\n",
    "pip install apache_beam==2.3\n",
    "pip install six==1.10"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "import datetime\n",
    "from google.cloud import pubsub\n",
    "import json\n",
    "import apache_beam as beam\n",
    "import os\n",
    "print beam.__version__"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create PubSub Topic and Subscription"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client = pubsub.Client()\n",
    "topic = client.topic(TOPIC)\n",
    "\n",
    "if not topic.exists():\n",
    "    print('Creating pub/sub topic {}...'.format(TOPIC))\n",
    "    topic.create()\n",
    "\n",
    "print('Pub/sub topic {} is up and running'.format(TOPIC))\n",
    "print(\"\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Submit Dataflow Stream Processing Job"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Data source (PubSub topic) and sink (BigQuery table)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pubsub_topic = \"projects/{}/topics/{}\".format(PROJECT, TOPIC)\n",
    "\n",
    "schema_definition = {\n",
    "    'source_id':'INTEGER',\n",
    "    'source_timestamp':'TIMESTAMP',\n",
    "    'estimated_weight_kg':'FLOAT',\n",
    "    'is_male': 'STRING',\n",
    "    'mother_age': 'FLOAT',\n",
    "    'mother_race': 'STRING',\n",
    "    'plurality': 'FLOAT',\n",
    "    'gestation_weeks': 'INTEGER',\n",
    "    'mother_married': 'BOOLEAN',\n",
    "    'cigarette_use': 'BOOLEAN',\n",
    "    'alcohol_use': 'BOOLEAN'\n",
    "}\n",
    "\n",
    "schema = str(schema_definition).replace('{','').replace('}','').replace(\"'\",'').replace(' ','')\n",
    "\n",
    "print('Pub/Sub Topic URL: {}'.format(pubsub_topic))\n",
    "print('')\n",
    "print('BigQuery Dataset: {}'.format(DATASET))\n",
    "print('BigQuery Tabe: {}'.format(TABLE))\n",
    "print('')\n",
    "print('BigQuery Table Schema: {}'.format(schema))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Cloud ML Engine prediction function"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def estimate_weight(json_message):\n",
    "  \n",
    "    import json\n",
    "    from googleapiclient import discovery\n",
    "    from oauth2client.client import GoogleCredentials\n",
    "    \n",
    "    global cmle_api\n",
    "    \n",
    "    # only do it once, not every time the function is called\n",
    "    if cmle_api is None:\n",
    "        credentials = GoogleCredentials.get_application_default()\n",
    "        cmle_api = discovery.build('ml', 'v1', credentials=credentials,\n",
    "                              discoveryServiceUrl='https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json',\n",
    "                              cache_discovery=False)\n",
    "\n",
    "    instance = json.loads(json_message)\n",
    "    source_id = instance.pop('source_id')\n",
    "    source_timestamp = instance.pop('source_timestamp')\n",
    "    \n",
    "    request_data = {'instances': [instance]}\n",
    "\n",
    "    model_url = 'projects/{}/models/{}/versions/{}'.format(PROJECT, MODEL_NAME, VERSION)\n",
    "    response = cmle_api.projects().predict(body=request_data, name=model_url).execute()\n",
    "\n",
    "    estimates = list(map(lambda item: round(item[\"scores\"],2)\n",
    "        ,response[\"predictions\"]\n",
    "    ))\n",
    "    \n",
    "    estimated_weight_kg =  round(int(estimates[0]) * 0.453592,2)\n",
    "    \n",
    "    instance['estimated_weight_kg'] = estimated_weight_kg\n",
    "    instance['source_id'] = source_id\n",
    "    instance['source_timestamp'] = source_timestamp\n",
    "\n",
    "    return instance"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Beam streaming pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def run_babyweight_estimates_streaming_pipeline():\n",
    "    \n",
    "    job_name = 'ingest-babyweight-estimates-{}'.format(datetime.datetime.now().strftime('%y%m%d-%H%M%S'))\n",
    "    print 'Launching Dataflow job {}'.format(job_name)\n",
    "    print 'Check the Dataflow jobs on Google Cloud Console...'\n",
    "\n",
    "    STG_DIR = 'gs://{}/babyweight'.format(STG_BUCKET)\n",
    "\n",
    "    options = {\n",
    "        'region': REGION,\n",
    "        'staging_location': os.path.join(STG_DIR, 'tmp', 'staging'),\n",
    "        'temp_location': os.path.join(STG_DIR, 'tmp'),\n",
    "        'job_name': job_name,\n",
    "        'project': PROJECT,\n",
    "        'streaming': True,\n",
    "        'teardown_policy': 'TEARDOWN_ALWAYS',\n",
    "        'no_save_main_session': True\n",
    "      }\n",
    "\n",
    "    opts = beam.pipeline.PipelineOptions(flags=[], **options)\n",
    "    \n",
    "    pipeline = beam.Pipeline(runner=\"Dataflow\", options=opts)\n",
    "      \n",
    "    (\n",
    "      pipeline | 'Read data from PubSub' >> beam.io.ReadStringsFromPubSub(topic=pubsub_topic) \n",
    "               | 'Process message' >> beam.Map(estimate_weight)\n",
    "               | 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=PROJECT, dataset=DATASET, table=TABLE, \n",
    "                                                                schema=schema,\n",
    "                                                                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED\n",
    "                                                               )\n",
    "    )\n",
    "\n",
    "    pipeline.run()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Run Pipeline on Dataflow"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "run_babyweight_estimates_streaming_pipeline()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prepare Sample Data Points"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "instances =  [\n",
    "      {\n",
    "        'is_male': 'True',\n",
    "        'mother_age': 26.0,\n",
    "        'mother_race': 'Asian Indian',\n",
    "        'plurality': 1.0,\n",
    "        'gestation_weeks': 39,\n",
    "        'mother_married': 'True',\n",
    "        'cigarette_use': 'False',\n",
    "        'alcohol_use': 'False'\n",
    "      },\n",
    "      {\n",
    "        'is_male': 'False',\n",
    "        'mother_age': 29.0,\n",
    "        'mother_race': 'Asian Indian',\n",
    "        'plurality': 1.0,\n",
    "        'gestation_weeks': 38,\n",
    "        'mother_married': 'True',\n",
    "        'cigarette_use': 'False',\n",
    "        'alcohol_use': 'False'\n",
    "      },\n",
    "      {\n",
    "        'is_male': 'True',\n",
    "        'mother_age': 26.0,\n",
    "        'mother_race': 'White',\n",
    "        'plurality': 1.0,\n",
    "        'gestation_weeks': 39,\n",
    "        'mother_married': 'True',\n",
    "        'cigarette_use': 'False',\n",
    "        'alcohol_use': 'False'\n",
    "      },\n",
    "      {\n",
    "        'is_male': 'True',\n",
    "        'mother_age': 26.0,\n",
    "        'mother_race': 'White',\n",
    "        'plurality': 2.0,\n",
    "        'gestation_weeks': 37,\n",
    "        'mother_married': 'True',\n",
    "        'cigarette_use': 'False',\n",
    "        'alcohol_use': 'True'\n",
    "      }\n",
    "  ]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Send Data Points to PubSub"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from random import shuffle\n",
    "\n",
    "iterations = 10000\n",
    "sleep_time = 1\n",
    "\n",
    "for i in range(iterations):\n",
    "    \n",
    "    shuffle(instances)\n",
    "    \n",
    "    for data_point in instances:\n",
    "        \n",
    "        source_timestamp = datetime.datetime.now().strftime(TIME_FORMAT)\n",
    "        source_id = str(abs(hash(str(data_point)+str(source_timestamp))) % (10 ** 10))\n",
    "        data_point['source_id'] = source_id\n",
    "        data_point['source_timestamp'] = source_timestamp\n",
    "        \n",
    "        message = json.dumps(data_point)\n",
    "        topic.publish(message=message, source_id = source_id, source_timestamp=source_timestamp)\n",
    "\n",
    "    print(\"Batch {} was sent to {}. \\n\\r Last Message was: {}\".format(i, topic.full_name, message))\n",
    "    print(\"\")\n",
    "\n",
    "    time.sleep(sleep_time)\n",
    "\n",
    "print(\"Done!\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Consume PubSub Topic "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client = pubsub.Client()\n",
    "topic = client.topic(TOPIC)\n",
    "\n",
    "subscription = topic.subscription(name=SUBSCRIPTION)\n",
    "if not subscription.exists():\n",
    "    print('Creating pub/sub subscription {}...'.format(SUBSCRIPTION))\n",
    "    subscription.create(client=client)\n",
    "\n",
    "print ('Pub/sub subscription {} is up and running'.format(SUBSCRIPTION))\n",
    "print(\"\")\n",
    "\n",
    "message = subscription.pull()\n",
    "\n",
    "print(\"source_id\", message[0][1].attributes[\"source_id\"])\n",
    "print(\"source_timestamp:\", message[0][1].attributes[\"source_timestamp\"])\n",
    "print(\"\")\n",
    "print(message[0][1].data)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Copyright 2017 Google Inc. Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
